Skip to content

reprocess failed account events #6571

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions kitsune/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -1128,6 +1128,7 @@ def filter_exceptions(event, hint):
DMS_UPDATE_L10N_CONTRIBUTOR_METRICS = config("DMS_UPDATE_L10N_CONTRIBUTOR_METRICS", default=None)
DMS_CLEANUP_EXPIRED_USERS = config("DMS_CLEANUP_EXPIRED_USERS", default=None)
DMS_CLEANUP_OLD_ACCOUNT_EVENTS = config("DMS_CLEANUP_OLD_ACCOUNT_EVENTS", default=None)
DMS_REPROCESS_FAILED_ACCOUNT_EVENTS = config("DMS_REPROCESS_FAILED_ACCOUNT_EVENTS", default=None)

PROD_DETAILS_CACHE_NAME = "product-details"
PROD_DETAILS_STORAGE = config(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,18 @@


class Command(BaseCommand):
help = "Process all unprocessed account events created within the given past number of days."
help = "Process all unprocessed account events created within the given number of hours."

def add_arguments(self, parser):
parser.add_argument(
"num_days_ago",
"--within-hours",
type=int,
default=24,
help=(
"The past number of days within which the "
"unprocessed account events have been created."
"The number of hours within which the unprocessed "
"account events have been created."
),
)

def handle(self, *args, **options):
process_unprocessed_account_events.delay(options["num_days_ago"])
process_unprocessed_account_events.delay(options["within_hours"])
44 changes: 30 additions & 14 deletions kitsune/users/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,26 @@

import waffle
from celery import shared_task
from django.db import transaction

from kitsune.products.models import Product
from kitsune.sumo.decorators import skip_if_read_only_mode
from kitsune.users.auth import FXAAuthBackend
from kitsune.users.models import AccountEvent
from kitsune.users.utils import anonymize_user, delete_user_pipeline

shared_task_with_retry = shared_task(
acks_late=True, autoretry_for=(Exception,), retry_backoff=2, retry_kwargs=dict(max_retries=4)
acks_late=True, autoretry_for=(Exception,), retry_backoff=2, retry_kwargs=dict(max_retries=3)
)


@shared_task_with_retry
@skip_if_read_only_mode
@transaction.atomic
def process_event_delete_user(event_id):
event = AccountEvent.objects.get(id=event_id)
try:
event = AccountEvent.objects.get(id=event_id, status=AccountEvent.UNPROCESSED)
except AccountEvent.DoesNotExist:
return

user = event.profile.user
event.profile = None
event.save(update_fields=["profile"])
Expand All @@ -33,9 +37,13 @@ def process_event_delete_user(event_id):


@shared_task_with_retry
@skip_if_read_only_mode
@transaction.atomic
def process_event_subscription_state_change(event_id):
event = AccountEvent.objects.get(id=event_id)
try:
event = AccountEvent.objects.get(id=event_id, status=AccountEvent.UNPROCESSED)
except AccountEvent.DoesNotExist:
return

body = json.loads(event.body)

last_event = AccountEvent.objects.filter(
Expand All @@ -60,9 +68,13 @@ def process_event_subscription_state_change(event_id):


@shared_task_with_retry
@skip_if_read_only_mode
@transaction.atomic
def process_event_password_change(event_id):
event = AccountEvent.objects.get(id=event_id)
try:
event = AccountEvent.objects.get(id=event_id, status=AccountEvent.UNPROCESSED)
except AccountEvent.DoesNotExist:
return

body = json.loads(event.body)

change_time = datetime.utcfromtimestamp(body["changeTime"] / 1000.0)
Expand All @@ -79,9 +91,13 @@ def process_event_password_change(event_id):


@shared_task_with_retry
@skip_if_read_only_mode
@transaction.atomic
def process_event_profile_change(event_id):
event = AccountEvent.objects.get(id=event_id)
try:
event = AccountEvent.objects.get(id=event_id, status=AccountEvent.UNPROCESSED)
except AccountEvent.DoesNotExist:
return

refresh_token = event.profile.fxa_refresh_token

fxa = FXAAuthBackend()
Expand All @@ -100,15 +116,15 @@ def process_event_profile_change(event_id):


@shared_task
def process_unprocessed_account_events(days):
def process_unprocessed_account_events(within_hours):
"""
Attempt to process all unprocessed account events that have been
created within the past "days" number of days.
created within the given number of hours.
"""
days_ago = datetime.now() - timedelta(days=days)
hours_ago = datetime.now() - timedelta(hours=within_hours)

for event in AccountEvent.objects.filter(
status=AccountEvent.UNPROCESSED, created_at__gte=days_ago
status=AccountEvent.UNPROCESSED, created_at__gte=hours_ago
):
match event.event_type:
case AccountEvent.DELETE_USER:
Expand Down
87 changes: 87 additions & 0 deletions kitsune/users/tests/test_tasks.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import json
from datetime import datetime
from unittest.mock import patch

from django.conf import settings
from django.contrib.auth.models import User
from django.db import DatabaseError
from waffle.testutils import override_switch

from kitsune.messages.utils import send_message
Expand All @@ -15,6 +17,7 @@
process_event_subscription_state_change,
)
from kitsune.users.tests import AccountEventFactory, GroupFactory, ProfileFactory, UserFactory
from kitsune.wiki.tests import ApprovedRevisionFactory


class AccountEventsTasksTestCase(TestCase):
Expand Down Expand Up @@ -67,6 +70,39 @@ def test_process_delete_user(self):

self.assertEqual(account_event.status, AccountEvent.PROCESSED)

@override_switch("enable-account-deletion", active=True)
def test_process_delete_user_atomicity(self):
"""Ensure that the processing of the delete user event is atomic."""
profile = ProfileFactory()
account_event = AccountEventFactory(
body=json.dumps({}),
event_type=AccountEvent.DELETE_USER,
status=AccountEvent.UNPROCESSED,
profile=profile,
)
rev = ApprovedRevisionFactory(creator=profile.user)

def event_save(*args, **kwargs):
event_save.call_count += 1
if event_save.call_count > 1:
raise DatabaseError()
return super(AccountEvent, account_event).save(*args, **kwargs)

event_save.call_count = 0

with patch("kitsune.users.tasks.AccountEvent.save") as event_save_mock:
event_save_mock.side_effect = event_save
with self.assertRaises(DatabaseError):
process_event_delete_user(account_event.id)

rev.refresh_from_db()
account_event.refresh_from_db()

self.assertEqual(account_event.profile, profile)
self.assertEqual(account_event.status, AccountEvent.UNPROCESSED)
self.assertEqual(rev.creator.username, profile.user.username)
self.assertTrue(User.objects.filter(id=profile.user.id).exists())

def test_process_subscription_state_change(self):
product_1 = ProductFactory(codename="capability_1")
product_2 = ProductFactory(codename="capability_2")
Expand Down Expand Up @@ -148,6 +184,36 @@ def test_process_subscription_state_change_out_of_order(self):
account_event_3.refresh_from_db()
self.assertEqual(account_event_3.status, AccountEvent.IGNORED)

def test_process_subscription_state_change_atomicity(self):
"""Ensure that the processing of the subscription state change is atomic."""
ProductFactory(codename="capability_1")
ProductFactory(codename="capability_2")
product = ProductFactory(codename="capability_3")
profile = ProfileFactory()
profile.products.add(product)
account_event = AccountEventFactory(
body=json.dumps(
{
"capabilities": ["capability_1", "capability_2"],
"isActive": True,
"changeTime": 1,
}
),
event_type=AccountEvent.SUBSCRIPTION_STATE_CHANGE,
status=AccountEvent.UNPROCESSED,
profile=profile,
)

with patch("kitsune.users.tasks.AccountEvent.save") as event_save_mock:
event_save_mock.side_effect = DatabaseError()
with self.assertRaises(DatabaseError):
process_event_subscription_state_change(account_event.id)

account_event.refresh_from_db()

self.assertEqual(account_event.status, AccountEvent.UNPROCESSED)
self.assertEqual(list(p.codename for p in profile.products.all()), ["capability_3"])

def test_process_password_change(self):
profile = ProfileFactory()
account_event_1 = AccountEventFactory(
Expand Down Expand Up @@ -179,3 +245,24 @@ def test_process_password_change(self):

self.assertEqual(profile.fxa_password_change, datetime.utcfromtimestamp(2))
self.assertEqual(account_event_2.status, AccountEvent.IGNORED)

def test_process_password_change_atomicity(self):
"""Ensure that the processing of the password change is atomic."""
profile = ProfileFactory()
account_event = AccountEventFactory(
body=json.dumps({"changeTime": 2000}),
event_type=AccountEvent.PASSWORD_CHANGE,
status=AccountEvent.UNPROCESSED,
profile=profile,
)

with patch("kitsune.users.tasks.AccountEvent.save") as event_save_mock:
event_save_mock.side_effect = DatabaseError()
with self.assertRaises(DatabaseError):
process_event_password_change(account_event.id)

profile.refresh_from_db()
account_event.refresh_from_db()

self.assertIs(profile.fxa_password_change, None)
self.assertEqual(account_event.status, AccountEvent.UNPROCESSED)
24 changes: 23 additions & 1 deletion scripts/cron.py
Original file line number Diff line number Diff line change
Expand Up @@ -410,13 +410,35 @@ def job_cleanup_old_account_events():
day_of_week=0,
max_instances=1,
coalesce=True,
skip=(settings.READ_ONLY or not waffle.switch_is_active('cleanup-expired-users')),
skip=(settings.READ_ONLY or not waffle.switch_is_active("cleanup-expired-users")),
)
@babis.decorator(ping_after=settings.DMS_CLEANUP_EXPIRED_USERS)
def job_cleanup_expired_users():
call_command("cleanup_expired_users")


# Every 4 hours, 15 minutes after the hour.
@scheduled_job(
"cron",
month="*",
day="*",
hour="*/4",
minute="15",
max_instances=1,
coalesce=True,
skip=settings.READ_ONLY,
)
@babis.decorator(ping_after=settings.DMS_REPROCESS_FAILED_ACCOUNT_EVENTS)
def job_reprocess_failed_account_events():
"""
Re-process any account events created within the past 24 hours that remain
in the unprocessed state. Kicks off a Celery task that does the following:
* Gathers all unprocessed account events created within the past 24 hours.
* Kicks off a separate Celery task to reprocess each one.
"""
call_command("reprocess_failed_account_events --within-hours 24")


def run():
try:
schedule.start()
Expand Down